In [ ]:
!pip install castra graphviz # missing dependency for
!apt-get install -y graphviz # logic graph viz
# just so we do plots in the notebook
%matplotlib inline
import dask # for parallel computing
from distributed import Executor, progress # for distributed parallel computing
The first thing to do is grab the Dask executor - the thing we'll use for submitting jobs to our cluster.
In [ ]:
e = Executor('dask.informaticslab.co.uk:8786')
print(e)
You should see now how many nodes are available in the executor. Ask Niall to show you how we scale this up and down using AWS.
In [ ]:
import socket
getIP = lambda: "Hello world from IP " + socket.gethostbyname(socket.gethostname())
getIP()
The cell above was just run on the same computer that this Notebook is running on. Let's submit it to the compute cluster using our Executor e.
You should open this window and click on Status where you can watch all the jobs (yours and others) being submitted to the comput node.
In [ ]:
e.run(getIP)
In [ ]:
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
def process_list(xs):
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
return output
def mysum(xs):
total = 0
for x in xs:
total += x
return total
In [ ]:
double(2)
In [ ]:
inc(double(2))
In [ ]:
data = [1, 2, 3, 4, 5]
result = mysum(process_list(data))
In [ ]:
print(result)
So far so normal. No for the magic of the wonderful dask.delayed decorator.
If we decorate the same functions, they become delayed functions. This means the are not yet executed, but their sitting waiting to be mapped onto our Exector e.
In [ ]:
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def double(x):
return x * 2
@dask.delayed
def add(x, y):
return x + y
def process_list(xs):
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
return dask.delayed(output)
@dask.delayed
def mysum(xs):
total = 0
for x in xs:
total += x
return total
In [ ]:
result = double(2)
In [ ]:
print(result)
In [ ]:
print(result.compute())
In [ ]:
data = [1, 2, 3, 4, 5]
result = mysum(dask.delayed(process_list(data)))
In [ ]:
result
In [ ]:
result.visualize()
In [ ]:
result.compute()
On more data, the logic graph gets automatically optimised
In [ ]:
data = range(300)
result = mysum(dask.delayed(process_list(data)))
result.visualize() #be patient!